package xiaoa.java.spider.save;

import java.util.ArrayList;

import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.bson.Document;

import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.BsonField;
import com.mongodb.client.model.Filters;

import xiaoa.java.log.L;
import xiaoa.java.mongoDB.DbMgr;
import xiaoa.java.mongoDB.DbUtils;
import xiaoa.java.spider.db.vo.FetchUrl;

/**
 * 保存
 * @author xiaoa
 * @date 2017年10月8日 上午8:51:51
 * @version V1.0
 *
 */
public class SaveRun implements Runnable {

	BlockingQueue<FetchUrl> queue = null;
	
	 public SaveRun( BlockingQueue<FetchUrl> queue) {
		 this.queue = queue;
	}
	
	@Override
	public void run() {
		
		Thread.currentThread().setName("save_" + Thread.currentThread().getName());
		
		
		// 一次加载100个
		while(true){
			try {
				
				long startTime = System.currentTimeMillis();
				
				List<FetchUrl>  tempList = new LinkedList<>();
				
				// 最多一次性取10000个
				queue.drainTo(tempList , 500);
				
				if (tempList == null || tempList.isEmpty()){
					Thread.sleep(TimeUnit.SECONDS.toMillis(5));
				}
				
				// 过滤重复
				filter(tempList);
				
				// 设置属性
				for (FetchUrl url :  tempList){
					
					url.setCreateTime(new Date());
					url.setState(FetchUrl.STATE_WAIT);
				}
				
				// 如果有 插入
				if (tempList != null && !tempList.isEmpty()){
					
					DbMgr.insertMany(tempList, FetchUrl.class);
					
					L.info(startTime,System.currentTimeMillis() ,"插入成功 count ：  " + tempList.size());
				}
				
				
			} catch (Throwable e) {
				e.printStackTrace();
				try {
					Thread.sleep(TimeUnit.SECONDS.toMillis(10));
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				
			}
			
		}
	
		
		
	}
	
	
	/**
	 * 过滤掉重复url
	 * @Title: filter
	 * @param list
	 * @throws Throwable
	 * @author xiaoa
	 */
	private void filter(List<FetchUrl> list )throws Throwable{
		
		if (list == null || list.size() == 0){
			return ;
		}
	
		long startTime = System.currentTimeMillis();
		
		
		List<String>  urlList = new ArrayList<>(list.size());
		
		for (FetchUrl url :  list){
			urlList.add(url.getUrl());
		}
		
		  // 获取集合
        MongoCollection<Document>   mongoCollection  = DbMgr.getMongoCollection( DbUtils.getCollectionName(FetchUrl.class) );
        
        MongoCursor<Document>  docIt =  mongoCollection.aggregate(
        		Arrays.asList(Aggregates.match(Filters.in("url", urlList)), Aggregates.group("$url",  new BsonField("count", new BasicDBObject("$sum", 1) )))
        		).iterator();
		
		
		// 根据url查询
		Map<String, Integer>  existMap = new HashMap<>();
		
        while(docIt.hasNext()){
        	
        	Document doc = docIt.next();
		
			existMap.put(doc.getString("_id").toString(), doc.getInteger("count"));
		}

		Iterator<FetchUrl> it = list.iterator();
		
	    for (FetchUrl url = (it.hasNext() ? it.next() : null) ; it.hasNext() ; url = it.next()){
	    	
	    		if(existMap.containsKey(url.getUrl())){
	    			
	    			L.debug("filter : url = " + url.getUrl() + " count = " + existMap.get(url.getUrl()));
	    			it.remove();
	    		}
	    		
	    }	
	    
	    L.info("filter  list.size = " + list.size() + "  use = " + (System.currentTimeMillis() - startTime));
		
	}
	
	

}
